package drds.plus.datasource.connection_restrict;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 将时间片分为多个槽，每个槽一个计数器。游标按时间循环遍历每个槽。游标移动时才清零并且只清零当前的槽； 因为采用mod计算(cursorIndex =
 * currentTime % timeslice/aSlotTime)，游标到头会自动折回来，事实上是一个环
 *
 * <pre>
 *                     cursorIndex
 *                       |
 * +---------------------+-------------------------+
 * |   |   |   |   |   | C |   |   |   |   |   |   |
 * +-----------------------------------------------+
 * |                                               |
 *  \-----------------timeslice-------------------/
 *
 * </pre>
 */
public class TimeSliceFlowControl {

    private final static int MAX_SLOT = 20; // 最多20片
    private final static int MIN_SLOT_TIME = 500; // slot时间最少500毫秒

    private final String name;
    private final AtomicInteger[] slots; // 槽数组，最小的时间粒度数组
    private final int aSlotTimeMillis; // 一个槽的时间，最小的时间单位
    private final int totalaSlotTimeMillis; // 总的时间窗口（时间片）大小
    private final int maxInstancePerTimeSlice; // 时间片内允许的最大访问次数(进入个数)

    private final AtomicInteger total = new AtomicInteger(); // 总的计数
    private final AtomicInteger totalReject = new AtomicInteger(); // 总的拒绝/超限计数
    private volatile int cursorIndex = 0; // 游标
    private volatile long cursorTimeMillis = System.currentTimeMillis(); // 当前slot的开始时间

    /**
     * @param name                    流控的名称
     * @param aSlotTimeMillis         //一个槽的时间
     * @param slotCount               //槽的数目
     * @param maxInstancePerTimeSlice //时间窗口内最多允许执行的次数，设为0则不限制
     */
    public TimeSliceFlowControl(String name, int aSlotTimeMillis, int slotCount, int maxInstancePerTimeSlice) {
        this.name = name;
        //
        this.aSlotTimeMillis = aSlotTimeMillis;
        if (slotCount < 2) {
            throw new IllegalArgumentException("slot至少要有两个");
        }
        this.totalaSlotTimeMillis = aSlotTimeMillis * slotCount;
        //
        slots = new AtomicInteger[slotCount];
        for (int i = 0; i < slotCount; i++) {
            slots[i] = new AtomicInteger(0);
        }
        //
        this.maxInstancePerTimeSlice = maxInstancePerTimeSlice;
    }

    /**
     * 最小的时间单位取默认的500毫秒
     *
     * @param name                    流控的名称
     * @param totalaSlotTimeMillis    时间片; 传0表示使用默认值1分钟
     * @param maxInstancePerTimeSlice 时间片内最多允许执行多少次，设为0则不限制
     */
    public TimeSliceFlowControl(String name, int totalaSlotTimeMillis, int maxInstancePerTimeSlice) {
        this.name = name;
        //
        if (totalaSlotTimeMillis == 0) {
            totalaSlotTimeMillis = 60 * 1000; // 时间片默认1分钟
        }
        if (totalaSlotTimeMillis < 2 * MIN_SLOT_TIME) {
            throw new IllegalArgumentException("时间片最少" + (2 * MIN_SLOT_TIME));
        }
        //
        int slotCount = MAX_SLOT; // 默认分20个slot
        int aSlotTimeMillis = totalaSlotTimeMillis / slotCount;
        if (aSlotTimeMillis < MIN_SLOT_TIME) {
            aSlotTimeMillis = MIN_SLOT_TIME; // 如果slot时间小于MIN_SLOT_TIME，则最小半秒
            slotCount = totalaSlotTimeMillis / aSlotTimeMillis;
        }
        //
        this.aSlotTimeMillis = aSlotTimeMillis;
        // this.totalaSlotTimeMillis = totalaSlotTimeMillis; //直接赋值因为截余的关系，会数组越界
        this.totalaSlotTimeMillis = this.aSlotTimeMillis * slotCount;
        slots = new AtomicInteger[slotCount];
        for (int i = 0; i < slotCount; i++) {
            slots[i] = new AtomicInteger(0);
        }
        this.maxInstancePerTimeSlice = maxInstancePerTimeSlice;
    }

    public void check() {
        if (!allow()) {
            throw new IllegalStateException(reportExceed());
        }
    }

    public String reportExceed() {
        return name + " exceed the limit " + maxInstancePerTimeSlice + " in timeslice " + totalaSlotTimeMillis;
    }

    public boolean allow() {
        final long currentTimeMillis = System.currentTimeMillis();
        final int index = (int) ((currentTimeMillis % totalaSlotTimeMillis) / aSlotTimeMillis);//left
        if (index != this.cursorIndex) {
            int oldIndex = this.cursorIndex;
            this.cursorIndex = index; // 尽快赋新值
            //
            final long oldCursorTimeMillis = cursorTimeMillis;
            cursorTimeMillis = currentTimeMillis; // 尽快赋新值
            //
            // 多个线程会进入下面这里，但是每个线程计算的total会大致相同
            if (currentTimeMillis - oldCursorTimeMillis > totalaSlotTimeMillis) {
                // 时间差大于timesliceMillis，则整个时间片都应该清零了
                for (int i = 0; i < slots.length; i++) {
                    slots[i].set(0); // 清零，忽略并发造成的计数出入
                }
                this.total.set(0);
            } else {
                do {
                    // 吃尾（尾清零），考虑跳跃的情况
                    oldIndex++;
                    if (oldIndex >= slots.length) {
                        oldIndex = 0;
                    }
                    slots[oldIndex].set(0); // 清零，忽略并发造成的计数出入
                } while (oldIndex != index);
                // int clearCount = slots[cursorIndex].get();
                // slots[cursorIndex].set(0); //清零，忽略并发造成的计数出入
                int newtotal = 0;
                for (int i = 0; i < slots.length; i++) {
                    newtotal += slots[i].get(); // 包括了新的当前槽
                }
                this.total.set(newtotal); // 设置总数，忽略并发造成的计数出入
            }
        } else {
            if (currentTimeMillis - cursorTimeMillis > aSlotTimeMillis) {
                // index相同但是时间差大于一个slot的时间，说明整个时间片都需要清零了
                cursorTimeMillis = currentTimeMillis; // 尽快赋新值
                for (int i = 0; i < slots.length; i++) {
                    slots[i].set(0); // 清零，忽略并发造成的计数出入
                }
                this.total.set(0);
            }
            // 是否为了避免开销，不做上面的判断？
        }

        if (maxInstancePerTimeSlice == 0) {
            return true; // 0为不限制
        }
        if (this.total.get() < maxInstancePerTimeSlice) {
            // 放来的才计数，拒绝的不计数
            slots[index].incrementAndGet();
            total.incrementAndGet();
            return true;
        } else {
            totalReject.incrementAndGet();
            return false;
        }
    }

    /**
     * @return 当前时间片内的总执行次数
     */
    public int getCurrentCount() {
        return total.get();
    }

    /**
     * @return 返回有史以来(对象创建以来)被拒绝/超限的次数
     */
    public int getTotalRejectCount() {
        return totalReject.get();
    }
}
